Source code for hysop.numerics.fft.host_fft

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
OpenCl backend base interface for fast Fourier Transforms.

:class:`~hysop.numerics.host_fft.HostFFTI`
:class:`~hysop.numerics.host_fft.HostFFTPlanI`
:class:`~hysop.numerics.host_fft.HostFFTQueue`
"""

import psutil, functools
import numpy as np

from hysop import (
    __FFTW_NUM_THREADS__,
    __FFTW_PLANNER_EFFORT__,
    __FFTW_PLANNER_TIMELIMIT__,
    __DEFAULT_NUMBA_TARGET__,
)
from hysop.tools.htypes import first_not_None, check_instance
from hysop.tools.numba_utils import (
    HAS_NUMBA,
    bake_numba_copy,
    bake_numba_accumulate,
    bake_numba_transpose,
)
from hysop.tools.hptt_utils import HAS_HPTT, hptt, can_exec_hptt, array_share_data
from hysop.tools.decorators import static_vars
from hysop.backend.host.host_array_backend import HostArrayBackend
from hysop.backend.host.host_array import HostArray
from hysop.numerics.fft.fft import FFTQueueI, FFTPlanI, FFTI

# Currently there is a bug for non contiguous arrays
# in numba so we disable it
HAS_NUMBA = False


[docs] class DummyEvent:
[docs] @classmethod def wait(cls): pass
[docs] class HostFFTQueue(FFTQueueI): """An host fft queue is just a tuple of callable objects.""" def __init__(self, name): self._name = name self._plans = () def __iadd__(self, *plans): for plan in plans: assert callable(plan) self._plans += (plan,) return self
[docs] def execute(self, wait_for=None): for plan in self._plans: plan() return DummyEvent
[docs] class HostFFTPlanI(FFTPlanI): """ Tag for FFT plans executing on host backend arrays. """ pass
[docs] class HostFFTI(FFTI): """ Abstract base for FFT interfaces targetting Host backends. """ def __init__(self, backend=None, allocator=None, **kwds): from hysop.backend.host.host_array_backend import HostArrayBackend if backend is None: backend = HostArrayBackend.get_or_create(allocator=allocator) if allocator is not None: mdg = "Host allocator does not match the one of the backend." assert backend.allocator is allocator check_instance(backend, HostArrayBackend) super().__init__(backend=backend, **kwds)
[docs] @classmethod def default_interface( cls, threads=None, backend=None, allocator=None, planner_effort=None, planning_timelimit=None, destroy_input=False, warn_on_allocation=True, warn_on_misalignment=True, error_on_allocation=False, **kwds, ): """ Get the default host FFT interface. Preferred interface is multithreaded MKL FFT with the TBB threading layer (does not work with Intel threading layer). On import error the interface falls back to a multithreaded FFTW interface with ESTIMATE planning effort. """ try: from hysop.numerics.fft._mkl_fft import MklFFT return MklFFT( backend=backend, allocator=allocator, destroy_input=destroy_input, warn_on_allocation=warn_on_allocation, error_on_allocation=error_on_allocation, **kwds, ) except ImportError: from hysop.numerics.fft.fftw_fft import FftwFFT threads = first_not_None(threads, __FFTW_NUM_THREADS__) planner_effort = first_not_None(planner_effort, __FFTW_PLANNER_EFFORT__) planning_timelimit = first_not_None( planning_timelimit, __FFTW_PLANNER_TIMELIMIT__ ) return FftwFFT( threads=threads, planner_effort=planner_effort, planning_timelimit=planning_timelimit, backend=backend, allocator=allocator, destroy_input=destroy_input, warn_on_allocation=warn_on_allocation, warn_on_misalignment=warn_on_misalignment, error_on_allocation=error_on_allocation, **kwds, )
[docs] def new_queue(self, tg, name): return HostFFTQueue(name=name)
[docs] def plan_copy(self, tg, src, dst): src = self.ensure_callable(src) dst = self.ensure_callable(dst) @static_vars(numba_copy=None) def exec_copy(src=src, dst=dst): src, dst = src(), dst() if can_exec_hptt(src, dst): hptt.tensorTransposeAndUpdate( perm=tuple(range(src.ndim)), alpha=1.0, A=src, beta=0.0, B=dst ) elif HAS_NUMBA: if exec_copy.numba_copy is None: exec_copy.numba_copy = bake_numba_copy(src=src, dst=dst) exec_copy.numba_copy() else: dst[...] = src return exec_copy
[docs] def plan_accumulate(self, tg, src, dst): src = self.ensure_callable(src) dst = self.ensure_callable(dst) @static_vars(numba_accumulate=None) def exec_accumulate(src=src, dst=dst): src, dst = src(), dst() if can_exec_hptt(src, dst): hptt.tensorTransposeAndUpdate( perm=range(src.ndim), alpha=1.0, A=src, beta=1.0, B=dst ) elif HAS_NUMBA: if exec_accumulate.numba_accumulate is None: exec_accumulate.numba_accumulate = bake_numba_accumulate( src=src, dst=dst ) exec_accumulate.numba_accumulate() else: dst[...] += src return exec_accumulate
[docs] def plan_transpose(self, tg, src, dst, axes): src = self.ensure_callable(src) dst = self.ensure_callable(dst) @static_vars(numba_transpose=None) def exec_transpose(src=src, dst=dst, axes=axes): src, dst = src(), dst() if can_exec_hptt(src, dst): hptt.tensorTransposeAndUpdate( perm=axes, alpha=1.0, A=src, beta=0.0, B=dst ) elif HAS_NUMBA: if exec_transpose.numba_transpose is None: exec_transpose.numba_transpose = bake_numba_transpose( src=src, dst=dst, axes=axes ) exec_transpose.numba_transpose() else: dst[...] = np.transpose(a=src, axes=axes) return exec_transpose
[docs] def plan_fill_zeros(self, tg, a, slices): assert slices a = self.ensure_callable(a) def exec_fill_zeros(a=a, slices=slices): buf = a() for slc in slices: buf[slc] = 0 return exec_fill_zeros
[docs] def plan_compute_energy( self, tg, fshape, src, dst, transforms, method="round", target=None, **kwds ): """ Plan to compute energy from src array to dst array using given transforms, method (round or weighted) and numba target. """ (N, NS2, C2C, R2C, S) = super().plan_compute_energy( tg, fshape, src, dst, transforms, **kwds ) dim = src.ndim # We do not forget the hermitian symmetry: # normally: E = 1/2 |Fi|**2 # but for a R2C transform: E = 1/2 |Fi|**2 + 1/2 |Fi*|**2 = 1 |Fi|**2 C = 2 ** (R2C - 1) if HAS_NUMBA: import numba as nb target = first_not_None(target, __DEFAULT_NUMBA_TARGET__) args = (src,) + N + NS2 + C2C + (C, S, dst) signature, layout = make_numba_signature(*args) if dim == 1: @nb.guvectorize( [signature], layout, target=target, nopython=True, cache=True ) def compute_energy_1d(FIN, Nx, NS2x, C2Cx, C, S, FOUT): for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[ix] Ei = C * (S * abs(Xi)) ** 2 FOUT[kx] += Ei exec_compute_energy = functools.partial(compute_energy_1d, *args) elif dim == 2: if method == "round": @nb.guvectorize( [signature], layout, target=target, nopython=True, cache=True ) def compute_energy_2d_round( FIN, Ny, Nx, NS2y, NS2x, C2Cy, C2Cx, C, S, FOUT ): for iy in range(Ny): ky = (Ny - iy) if (C2Cy and iy > NS2y) else iy for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[iy, ix] Ei = C * (S * abs(Xi)) ** 2 k = int(round((kx**2 + ky**2) ** 0.5)) FOUT[k] += Ei exec_compute_energy = functools.partial( compute_energy_2d_round, *args ) elif method == "weighted": @nb.guvectorize( [signature], layout, target=target, nopython=True, cache=True ) def compute_energy_2d_weighted( FIN, Ny, Nx, NS2y, NS2x, C2Cy, C2Cx, C, S, FOUT ): for iy in range(Ny): ky = (Ny - iy) if (C2Cy and iy > NS2y) else iy for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[iy, ix] Ei = C * ((S * abs(Xi)) ** 2) k = (kx**2 + ky**2) ** 0.5 i = int(k) j = min(i + 1, FOUT.size - 1) a = k - i FOUT[i] += (1 - a) * Ei FOUT[j] += a * Ei exec_compute_energy = functools.partial( compute_energy_2d_weighted, *args ) else: msg = f"Unknown method {method}." raise ValueError(msg) elif dim == 3: if method == "round": @nb.guvectorize( [signature], layout, target=target, nopython=True, cache=True ) def compute_energy_3d_round( FIN, Nz, Ny, Nx, NS2z, NS2y, NS2x, C2Cz, C2Cy, C2Cx, C, S, FOUT ): for iz in range(Nz): kz = (Nz - iz) if (C2Cz and iz > NS2z) else iz for iy in range(Ny): ky = (Ny - iy) if (C2Cy and iy > NS2y) else iy for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[iz, iy, ix] Ei = C * ((S * abs(Xi)) ** 2) k = int(round((kx**2 + ky**2 + kz**2) ** 0.5)) FOUT[k] += Ei exec_compute_energy = functools.partial( compute_energy_3d_round, *args ) elif method == "weighted": @nb.guvectorize( [signature], layout, target=target, nopython=True, cache=True ) def compute_energy_3d_weighted( FIN, Nz, Ny, Nx, NS2z, NS2y, NS2x, C2Cz, C2Cy, C2Cx, C, S, FOUT ): for iz in range(Nz): kz = (Nz - iz) if (C2Cz and iz > NS2z) else iz for iy in range(Ny): ky = (Ny - iy) if (C2Cy and iy > NS2y) else iy for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[iz, iy, ix] Ei = C * ((S * abs(Xi)) ** 2) k = (kx**2 + ky**2 + kz**2) ** 0.5 i = int(k) j = min(i + 1, FOUT.size - 1) a = k - i FOUT[i] += (1 - a) * Ei FOUT[j] += a * Ei exec_compute_energy = functools.partial( compute_energy_3d_weighted, *args ) else: msg = f"Unknown method {method}." raise ValueError(msg) else: msg = "{}D compute_energy has not been implemented yet." raise NotImplementedError(msg.format(dim)) else: if dim == 1: def compute_energy_1d(FIN, Nx, NS2x, C2Cx, C, S, FOUT): for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[ix] Ei = C * (S * abs(Xi)) ** 2 FOUT[kx] += Ei exec_compute_energy = functools.partial(compute_energy_1d, *args) elif dim == 2: if method == "round": def compute_energy_2d_round( FIN, Ny, Nx, NS2y, NS2x, C2Cy, C2Cx, C, S, FOUT ): for iy in range(Ny): ky = (Ny - iy) if (C2Cy and iy > NS2y) else iy for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[iy, ix] Ei = C * (S * abs(Xi)) ** 2 k = int(round((kx**2 + ky**2) ** 0.5)) FOUT[k] += Ei exec_compute_energy = functools.partial( compute_energy_2d_round, *args ) elif method == "weighted": def compute_energy_2d_weighted( FIN, Ny, Nx, NS2y, NS2x, C2Cy, C2Cx, C, S, FOUT ): for iy in range(Ny): ky = (Ny - iy) if (C2Cy and iy > NS2y) else iy for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[iy, ix] Ei = C * ((S * abs(Xi)) ** 2) k = (kx**2 + ky**2) ** 0.5 i = int(k) j = min(i + 1, FOUT.size - 1) a = k - i FOUT[i] += (1 - a) * Ei FOUT[j] += a * Ei exec_compute_energy = functools.partial( compute_energy_2d_weighted, *args ) else: msg = f"Unknown method {method}." raise ValueError(msg) elif dim == 3: if method == "round": def compute_energy_3d_round( FIN, Nz, Ny, Nx, NS2z, NS2y, NS2x, C2Cz, C2Cy, C2Cx, C, S, FOUT ): for iz in range(Nz): kz = (Nz - iz) if (C2Cz and iz > NS2z) else iz for iy in range(Ny): ky = (Ny - iy) if (C2Cy and iy > NS2y) else iy for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[iz, iy, ix] Ei = C * ((S * abs(Xi)) ** 2) k = int(round((kx**2 + ky**2 + kz**2) ** 0.5)) FOUT[k] += Ei exec_compute_energy = functools.partial( compute_energy_3d_round, *args ) elif method == "weighted": def compute_energy_3d_weighted( FIN, Nz, Ny, Nx, NS2z, NS2y, NS2x, C2Cz, C2Cy, C2Cx, C, S, FOUT ): for iz in range(Nz): kz = (Nz - iz) if (C2Cz and iz > NS2z) else iz for iy in range(Ny): ky = (Ny - iy) if (C2Cy and iy > NS2y) else iy for ix in range(Nx): kx = (Nx - ix) if (C2Cx and ix > NS2x) else ix Xi = FIN[iz, iy, ix] Ei = C * ((S * abs(Xi)) ** 2) k = (kx**2 + ky**2 + kz**2) ** 0.5 i = int(k) j = min(i + 1, FOUT.size - 1) a = k - i FOUT[i] += (1 - a) * Ei FOUT[j] += a * Ei exec_compute_energy = functools.partial( compute_energy_3d_weighted, *args ) else: msg = f"Unknown method {method}." raise ValueError(msg) else: msg = "{}D compute_energy has not been implemented yet." raise NotImplementedError(msg.format(dim)) return exec_compute_energy
[docs] @classmethod def ensure_callable(cls, get_buffer): if callable(get_buffer): return get_buffer else: def get_buf(buf=get_buffer): return buf return get_buf